Client-go
1.Client-go的四种方式
- ClientSent
- DynamicClient
- RESTClient
- DiscoveryClient
一般我们使用前两种就够了,第三种也使用很少,第四种我们作为了解即可。通过下面的逻辑,我们可以看出,最终前面三门都会请求RestClient。最终RestClient去请求API server。重点可以了解ClientSet和DynamicClient。

package main
import (
"context"
"fmt"
"flag"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
1.1 RestClient
RestClient是属于比较底层的客户端,下面通过RestClient来实现一个客户端的简单实现
package main
import (
"context"
"flag"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
config.APIPath = "api"
config.GroupVersion = &corev1.SchemeGroupVersion
//指定编码器
config.NegotiatedSerializer = scheme.Codecs
//初始化 restClient
restClient, err := rest.RESTClientFor(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
//因为RestClient获取到的是一个json ,需要 unmashl
//需要创建一个空结构体,用户存储pod列表
podList := &corev1.PodList{}
restClient.Get().Namespace("monitoring").Resource("pods").VersionedParams(&metav1.ListOptions{Limit: 500}, scheme.ParameterCodec).Do(context.Background()).Into(podList)
for _, pod := range podList.Items {
fmt.Println(pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
}
}
可以点Get()方法进去之后查看还有很多的方法,通过rest的方式实现。 比较底层,实现起来需要定义结构体, 也比较麻烦

1.2 ClientSet
源码地址:
https://github.com/kubernetes/client-go/blob/master/kubernetes/clientset.go
最常用的Client, 实现了所有k8s标准对象的接口
package main
import (
"context"
"flag"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/pointer"
)
// 通过clientSet实现创建一个服务
func main() {
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
//定义一个nginx deployment
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-clientset",
},
Spec: appsv1.DeploymentSpec{
Replicas: pointer.Int32Ptr(1),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "nginx",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "nginx",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:1.7.9",
Ports: []corev1.ContainerPort{
{
ContainerPort: 80,
},
},
},
},
},
},
},
}
//创建nginx deployment
deploymentClient := clientset.AppsV1().Deployments(corev1.NamespaceDefault)
result, err := deploymentClient.Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil {
fmt.Printf("error: %s", err.Error())
}
fmt.Printf("Deployment created: %s", result.GetObjectMeta().GetName())
}
执行
go mod init && go run main.go
如何想要定义一个service,其实跟上面是类似的
总结: ClientSet基本上可以对所有k8s的资源进行增删改查,但是只有一种例外,就是crd。 ClientSet中封装的都是k8s标准的一些资源,所以不会自定义的crd资源封装,如果想要实现操作自定义crd资源,那么在下面的方式中,通过NynamicClient来实现,那么在下面的方式中我将会进行演示说明。
1.3DynamicClient
源码地址:
https://github.com/kubernetes/client-go/tree/master/dynamic
动态客户端: 可以对任何资源进行操作(包括标准k8s资源以及自定义crd)
返回的所有数据结构都是Unstructured struct
1.3.1 使用DynamicClient来获取资源
package main
import (
"flag"
"fmt"
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
//可通过GVR来创建任何资源
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
//指定GVR
gvr := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}
//获取pod 列表,这里所谓的podlist是没有unStruct,是属于非结构化数据
unStructPodList, err := dynamicClient.Resource(gvr).Namespace("kube-system").List(context.TODO(), metav1.ListOptions{})
if err != nil {
fmt.Printf("error: %s" + err.Error())
}
//定义一个podList Struct
PodList := &corev1.PodList{}
//将Unstructured 转换为PodList
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unStructPodList.UnstructuredContent(), PodList)
if err != nil {
fmt.Printf("error: %s" + err.Error())
}
for _, Pod := range PodList.Items {
fmt.Println(Pod.Name, Pod.Namespace, Pod.Status.Phase)
}
}
1.3.2 读取yaml文件来创建资源
package main
import (
"context"
_ "embed"
"flag"
"fmt"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"strings"
)
var (
//go:embed nginx.yaml
deployYaml string
)
func main() {
//可通过GVR来创建任何资源
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
//解析yaml 转成Unstructured
deployObj := &unstructured.Unstructured{}
if err = yaml.Unmarshal([]byte(deployYaml), deployObj); err != nil {
fmt.Printf("error: %s", err.Error())
}
//指定GVR
apiVersion, found, err := unstructured.NestedString(deployObj.Object, "apiVersion")
if err != nil || !found {
fmt.Printf("error: %s", err.Error())
}
kind, found, err := unstructured.NestedString(deployObj.Object, "kind")
if err != nil || !found {
fmt.Printf("error: %s", err.Error())
}
gvr := schema.GroupVersionResource{}
versionParts := strings.Split(apiVersion, "/")
if len(versionParts) == 2 {
gvr.Group = versionParts[0]
gvr.Version = versionParts[1]
} else {
gvr.Group = versionParts[0]
}
switch kind {
case "Deployment":
gvr.Resource = "deployments"
case "pod":
gvr.Resource = "pods"
default:
fmt.Printf("Unsupported kind: %s", kind)
}
//使用dynamicClient创建资源
_, err = dynamicClient.Resource(gvr).Namespace("default").Create(context.TODO(), deployObj, v1.CreateOptions{})
if err != nil {
fmt.Printf("error: %s", err.Error())
}
fmt.Printf("Create deployment successfully")
}
Nginx.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-dynamic-2
spec:
selector:
matchLabels:
app: nginx
replicas: 2 # tells deployment to run 2 pods matching the template
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.14.2
ports:
- containerPort: 80
2.Client-gowatch
2.1 使用go-watch实现监听
package main
import (
"context"
"flag"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
timeout := int64(600)
watcher, _ := clientSet.CoreV1().Pods("default").Watch(context.TODO(), metav1.ListOptions{TimeoutSeconds: &timeout})
for event := range watcher.ResultChan() {
item := event.Object.(*corev1.Pod)
switch event.Type {
case watch.Added:
processPod(item.GetName())
case watch.Modified:
case watch.Deleted:
}
}
}
func processPod(podName string) {
fmt.Printf("processing pod %s\n", podName)
}
timeout := int64(600)
定义了watch的时间
为什么不推荐直接使用Watch
-
处理watch超时,断开重连等情况的处理比较复杂
-
Watch机制直接请求K8s API Server,增加了集群负载
-
对于希望对多个资源Watch时需要创建单独的连接而无法共享,增加资源消耗和集群负载
-
重连可能会导致事件丢失
-
大量事件产生时无限流逻辑,可能导致业务过载崩溃
-
业务获得事件信号后,如果处理失败,没有第二次处理机会
2.2 使用Informer代替
使用Informer的一些特点
-
基于Watch实现,提供更高层次的抽象,更简单,安全,高性能
-
自动处理超时和重连机制
-
本地缓存机制,无须频繁调用API Server
-
内置全量和增量同步机制,确保事件不丢失
-
可结合Rate Limiting和延迟队列,控制时间处理速率,避免业务过载,同时支持错误和重试
使用SharedlnformerFactory创建一个共享的Informer实例 可以实现监听多个资源,比如监听Deployment,Service等,这样可以减少网络和资源消耗,减轻K8s API负载
下面通过Informer来实现,可以解决上面watch的存在的一些问题,比如,informer可以通过缓存来减轻多api-server的压力
package main
import (
"flag"
"fmt"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"time"
)
func main() {
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
informerFactory := informers.NewSharedInformerFactory(clientSet, time.Minute*12) //全量同步时间
//deployment 可以对deployment进行监听
deploymentInformer := informerFactory.Apps().V1().Deployments()
informer := deploymentInformer.Informer()
deployLister := deploymentInformer.Lister()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
fmt.Println("Add obj:")
},
UpdateFunc: func(old, new interface{}) {
fmt.Println("Update old:")
},
DeleteFunc: func(obj interface{}) {
fmt.Println("Delete obj:")
},
})
//service 可以对service进行监听
serviceInformer := informerFactory.Core().V1().Services()
informer = serviceInformer.Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
fmt.Println("Add obj:")
},
UpdateFunc: func(old, new interface{}) {
fmt.Println("Update old")
},
DeleteFunc: func(obj interface{}) {
fmt.Println("Delete obj:")
},
})
stopper := make(chan struct{})
defer close(stopper)
//启动informer, List & watch
informerFactory.Start(stopper)
informerFactory.WaitForCacheSync(stopper) //等待Informer的所有缓存
deployments, err := deployLister.List(labels.Everything())
if err != nil {
fmt.Printf("error: %s", err.Error())
}
for idx, deploy := range deployments {
fmt.Println(idx, deploy.Name)
}
<-stopper
}
2.3 引入RateLimitingQueue
在上面的一个列子,EventHandler业务逻辑处理失败了怎么办?
基于事件驱动,没有二次处理机会
引入WorkQueue(RateLimitingQueue)处理业务逻辑: 错误重试,防止Hot Loop(过载)
package main
import (
"flag"
"fmt"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"time"
)
func main() {
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
informerFactory := informers.NewSharedInformerFactory(clientSet, time.Second*30)
//ratelimitqueue
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
//对 deployment监听
deployInformer := informerFactory.Apps().V1().Deployments()
informer := deployInformer.Informer()
informer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { onAddDeployment(obj, queue) },
UpdateFunc: func(old, new interface{}) { onUpdateDeployment(new, queue) },
DeleteFunc: func(obj interface{}) { onDeleteDeployment(obj, queue) },
})
controller := NewController(queue, deployInformer.Informer().GetIndexer(), informer)
stopper := make(chan struct{})
defer close(stopper)
//启动informer
informerFactory.Start(stopper)
informerFactory.WaitForCacheSync(stopper)
//处理队列里的事件
go func() {
for {
if !controller.processNextItem() {
break
}
}
}()
<-stopper
}
type Controller struct {
indexer cache.Indexer
queue workqueue.TypedRateLimitingInterface[string]
informer cache.Controller
}
func NewController(qeue workqueue.TypedRateLimitingInterface[string], indexer cache.Indexer, informer cache.Controller) *Controller {
return &Controller{
indexer: indexer,
queue: qeue,
informer: informer,
}
}
func (c *Controller) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncToStdout(key)
c.handleErr(err, key)
return true
}
func (c *Controller) syncToStdout(key string) error {
//通过key 直接从indexer中获取对象
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
fmt.Printf("error getting object with key %s from store: % v\n", key, err)
return err
}
if !exists {
fmt.Printf("key %s does not exist anymore\n", key)
} else {
deployment := obj.(*appsv1.Deployment)
fmt.Printf("sync object %s to stdout\n", deployment.GetName())
if deployment.Name == "test-deployment" {
return fmt.Errorf("test-deployment is not allowed")
}
}
return nil
}
func (c *Controller) handleErr(err error, key string) {
if err == nil {
c.queue.Forget(key)
return
}
if c.queue.NumRequeues(key) < 5 {
fmt.Printf("Error syncing pod %v: %v", key, err)
c.queue.AddRateLimited(key)
}
c.queue.Forget(key)
fmt.Printf("Error syncing pod %v: %v", key, err)
}
func onAddDeployment(obj interface{}, queue workqueue.TypedRateLimitingInterface[string]) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
}
func onUpdateDeployment(newobj interface{}, queue workqueue.TypedRateLimitingInterface[string]) {
key, err := cache.MetaNamespaceKeyFunc(newobj)
if err == nil {
queue.Add(key)
}
}
func onDeleteDeployment(obj interface{}, queue workqueue.TypedRateLimitingInterface[string]) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
}